Goroutine 在项目中的实践

Goroutine 是Golang语言的一大特色,Goroutine的出现,使得并发得到大幅提升。我们一起看下Goroutine在项目中的实践。

Goroutine并发控制

在业务开发中,会碰到几个相互独立的耗时操作,可以并行执行,这个时候Goroutine是很方便派上用场的。如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// someOperation your work to do
// if we have some data to return use channel to pass data
func someOperation() error {
time.Sleep(1 * time.Second)
return nil
}

// anotherOperation
// another work indenpendent with someOperation
func anotherOperation() error {
time.Sleep(1 * time.Second)
return nil
}

func bizFunc() error {
wg := sync.WaitGroup{} // sync.WatiGroup to sync goroutine
wg.Add(2) // we have 2 operation to do, so we add 2

go func() {
err := someOperation()
if err != nil {
// whatever handler
}
wg.Done()
}()

go func(){
err := anotherOperation()
wg.Done()
}()
wg.Wait() // wait all goroutine to return
// other operation depend on the two before
}

Gorotine 最大个数

上面的案例需要我们知道协程的数量,然后等待所有协程结束,那如果我们不确定协程的个数或者我们需要设置固定个数的协程,该如何做呢?

其实也很简单,利用channel的阻塞特性,创建一个固定长度的channel,创建一个协程,在channel中写入一条数据,当channel被填满后,就会阻塞;协程结束后,从channle中消费一条数据,协程就又可以写入数据,如此可固定协程的数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// wrapped for wait group

import (
"context"
"sync"
)

const defaultSize = 32

// SizeWaitGroup the struct control limit of waitgroup
type SizeWaitGroup struct {
buf chan struct{} // buffer to buf the current number of goroutines
wg sync.WaitGroup // the real wait group
}

// NewSizeWaitGroup wait group with limit
func NewSizeWaitGroup(size int) *SizeWaitGroup {
if size <= 0 {
size = defaultSize
}
return &SizeWaitGroup{
buf: make(chan struct{}, size), // init the size of channel
wg: sync.WaitGroup{},
}
}

// Add
func (c *SizeWaitGroup) Add() {
_ = c.AddWithContext(context.Background())
}

// AddWithContext
// blocking if the number of goroutines has been reached
func (c *SizeWaitGroup) AddWithContext(ctx context.Context) error {
//
select {
case <-ctx.Done(): // parent goroutines call canceled or timedout or other happend
return ctx.Err()
case c.buf <- struct{}{}: // block if channel is full
break
}
c.wg.Add(1) // we created a goroutine
return nil
}

// Done
func (c *SizeWaitGroup) Done() {
<-c.buf // a goroutine finished
c.wg.Done()
}

// Wait
func (c *SizeWaitGroup) Wait() {
c.wg.Wait()
}

如上代码所示,创建一个固定长度的channel,添加协程之前先往队列里增加一个占位符(struct{} 结构不占用内存,协程数量大时不会太占用内存),然后再调用真正的WaitGroup增加协程控制,执行完成后调用Done方法,从队列中取出占位符调用真正的WaitGroup的Done函数。

调用如下:

1
2
3
4
5
6
7
8
9
10
swg := NewSizeWaitGroup(128)
for index := 0; index < 1000; index += 1 {
swg.Add()
go func() {
// do what you want
swg.Done()
}
}

swg.Wait()

这样,协程的最大数量会保持在128个。

总结

Golang提供的channel与Goroutine 提供很方便的通信与并发功能,在实际的业务开发中,可以很方便讲相互独立的功能并发处理,提高系统的吞吐量。